package com.xherorlclass.core;

import com.xherorlclass.NettyBootstrapInitializer;
import com.xherorlclass.XherorlRpcBootstrap;
import com.xherorlclass.compress.CompressorFactory;
import com.xherorlclass.discovery.Registry;
import com.xherorlclass.enumeration.RequestType;
import com.xherorlclass.serialize.SerializerFactory;
import com.xherorlclass.transport.message.XherorlRpcRequest;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author ：Xherorl
 * @date ：Created in 2023/8/29 14:08
 * @description：心跳探测的核心目的是什么？探活，感知哪些服务器的连接状态是正常的，哪些是不正常的
 */
@Slf4j
public class HeartbeatDetector {

    public static void detectHeartbeat(String serviceName){
        // 1、从注册中心拉取服务列表并建立连接
        Registry registry = XherorlRpcBootstrap.getInstance().getConfiguration().getRegistryConfig().getRegistry();
        List<InetSocketAddress> addresses = registry.lookup(serviceName, XherorlRpcBootstrap.getInstance().getConfiguration().getGroup());

        // 将连接缓存
        for (InetSocketAddress address : addresses) {
            try {
                if (!XherorlRpcBootstrap.CHANNEL_CACHE.containsKey(address)){
                    Channel channel = NettyBootstrapInitializer.getBootstrap().connect(address).sync().channel();
                    XherorlRpcBootstrap.CHANNEL_CACHE.put(address, channel);
                }
            }catch (InterruptedException e){
                throw new RuntimeException(e);
            }
        }

        // 3、任务，定期发送消息
        Thread thread = new Thread(() -> new Timer().scheduleAtFixedRate(new MyTimerTask(), 0, 2000), "xherorlRpc-HeartbeatDetector-thread");
        // 守护线程
        thread.setDaemon(true);
        thread.start();

    }

    private static class MyTimerTask extends TimerTask {
        @Override
        public void run() {
            // 将响应时长的map清空
            XherorlRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.clear();

            // 遍历所有的channel
            Map<InetSocketAddress, Channel> cache = XherorlRpcBootstrap.CHANNEL_CACHE;

            for (Map.Entry<InetSocketAddress, Channel> entry : cache.entrySet()) {
                // 定义一个重试的次数
                int tryTimes = 3;
                while (tryTimes > 0){
                    // 通过心跳检测处理每一个channel
                    Channel channel = entry.getValue();

                    long start = System.currentTimeMillis();

                    // 构建一个心跳检测
                    XherorlRpcRequest xherorlRpcRequest = XherorlRpcRequest.builder()
                            .requestId(XherorlRpcBootstrap.getInstance().getConfiguration().getIdGenerator().getId())
                            .compressType(CompressorFactory.getCompressor(XherorlRpcBootstrap.getInstance().getConfiguration().getCompressType()).getCode())
                            .requestType(RequestType.HEART_BEAT.getId())
                            .serializeType(SerializerFactory.getSerializer(XherorlRpcBootstrap.getInstance().getConfiguration().getSerializeType()).getCode())
                            .timeStamp(start)
                            .build();

                    // 4、写出报文
                    CompletableFuture<Object> completableFuture = new CompletableFuture<>();
                    // 将 completableFuture 暴露出去
                    XherorlRpcBootstrap.PENDING_REQUEST.put(xherorlRpcRequest.getRequestId(), completableFuture);

                    channel.writeAndFlush(xherorlRpcRequest).addListener((ChannelFutureListener) promise -> {
                        if (!promise.isSuccess()) {
                            completableFuture.completeExceptionally(promise.cause());
                        }
                    });

                    Long endTime = 0L;
                    try {
                        // 阻塞方法，get()方法如果得不到结果，就会一直阻塞
                        // 我们想不一直阻塞可以添加参数
                        completableFuture.get(1, TimeUnit.SECONDS);
                        endTime = System.currentTimeMillis();
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                        // 一旦发生问题，需要优先重试
                        tryTimes--;
                        log.error("和地址为【{}】的主机连接发生异常.正在进行第【{}】次重试......",channel.remoteAddress(), 3-tryTimes);

                        // 将重试的机会用尽，将失效的地址移出服务列表
                        if (tryTimes == 0 ){
                            XherorlRpcBootstrap.CHANNEL_CACHE.remove(entry.getKey());
                        }

                        // 尝试等待一段时间后重试
                        try {
                            Thread.sleep(10*(new Random().nextInt(5)));
                        }catch (InterruptedException ex){
                            throw new RuntimeException(ex);
                        }

                        continue;
                    }
                    Long time = endTime - start;

                    // 使用treemap进行缓存
                    XherorlRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.put(time, channel);
                    log.debug("和【{}】服务器的响应时间是【{}】。", entry.getKey(), time);
                    break;
                }
            }

            log.info("-----------------------响应时间的treemap----------------------");
            for (Map.Entry<Long, Channel> entry : XherorlRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.entrySet()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}]--->channelId:[{}]", entry.getKey(), entry.getValue().id());
                }
            }
        }
    }
}
